// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};

use risingwave_pb::id::{ActorId, FragmentId};

use crate::controller::id::{
    IdCategory, IdCategoryType, IdGeneratorManager as SqlIdGeneratorManager,
};

/// A wrapper to distinguish global ID generated by the [`SqlIdGeneratorManager`] and the local ID from
/// the frontend.
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub(super) struct GlobalId<const TYPE: IdCategoryType>(u32);

impl GlobalFragmentId {
    pub fn new(id: FragmentId) -> Self {
        Self(id.as_raw_id())
    }
}

impl<const TYPE: IdCategoryType> GlobalId<TYPE> {
    pub fn as_global_id<T: From<u32>>(&self) -> T {
        self.0.into()
    }
}

impl<const TYPE: IdCategoryType> From<u32> for GlobalId<TYPE> {
    fn from(id: u32) -> Self {
        Self(id)
    }
}

/// Utility for converting local IDs into pre-allocated global IDs by adding an `offset`.
///
/// This requires the local IDs exactly a permutation of the range `[0, len)`.
#[derive(Clone, Copy, Debug)]
pub(super) struct GlobalIdGen<ID: From<u32>> {
    offset: u32,
    len: u32,
    _phantom: PhantomData<ID>,
}

pub(super) type GlobalFragmentId = GlobalId<{ IdCategory::Fragment }>;
pub(super) type GlobalFragmentIdGen = GlobalIdGen<GlobalFragmentId>;

pub(super) type GlobalTableIdGen = GlobalIdGen<GlobalId<{ IdCategory::Table }>>;

#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub(super) struct GlobalActorId(u32);

impl GlobalActorId {
    pub fn new(id: ActorId) -> Self {
        Self(id.as_raw_id())
    }

    pub fn as_global_id(&self) -> ActorId {
        self.0.into()
    }
}

impl From<u32> for GlobalActorId {
    fn from(id: u32) -> Self {
        Self(id)
    }
}

impl<const TYPE: IdCategoryType> GlobalIdGen<GlobalId<TYPE>> {
    /// Pre-allocate a range of IDs with the given `len` and return the generator.
    pub fn new(id_gen: &SqlIdGeneratorManager, len: u64) -> Self {
        let offset = id_gen.generate_interval::<TYPE>(len);
        Self {
            offset: offset as u32,
            len: len as u32,
            _phantom: PhantomData,
        }
    }
}

pub(super) type GlobalActorIdGen = GlobalIdGen<GlobalActorId>;

impl GlobalIdGen<GlobalActorId> {
    pub fn new(counter: &AtomicU32, len: u64) -> Self {
        let len_u32 = u32::try_from(len).expect("actor count exceeds u32::MAX");
        let offset = counter.fetch_add(len_u32, Ordering::Relaxed);
        Self {
            offset,
            len: len_u32,
            _phantom: PhantomData,
        }
    }
}

impl<ID: From<u32>> GlobalIdGen<ID> {
    /// Convert local id to global id. Panics if `id >= len`.
    pub fn to_global_id(&self, local_id: u32) -> ID {
        assert!(
            local_id < self.len,
            "id {} is out of range (len: {})",
            local_id,
            self.len
        );
        ID::from(local_id + self.offset)
    }

    pub fn len(&self) -> u32 {
        self.len
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn global_actor_id_gen_reserves_unique_ranges() {
        let counter = AtomicU32::new(10);
        let first = GlobalActorIdGen::new(&counter, 3);
        assert_eq!(first.len(), 3);
        let second = GlobalActorIdGen::new(&counter, 2);
        assert_eq!(second.len(), 2);

        assert_eq!(first.to_global_id(0).as_global_id(), 10);
        assert_eq!(first.to_global_id(2).as_global_id(), 12);
        assert_eq!(second.to_global_id(1).as_global_id(), 14);
        assert_eq!(counter.load(Ordering::Relaxed), 15);
    }
}
